package com.atguigu.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @className: DwdTrafficeUserJumpDetail
 * @author: LinCong
 * @description:
 * @date: 2023/1/27 15:06
 * @version: 1.0
 */

//日志服务器（.log）-> flume -> kafka -> flink(BaseLogApp) -> kafka -> flink(DwdTrafficUniqueVisitorDetail) -> kafka
public class DwdTrafficeUserJumpDetail {
    public static void main(String[] args) throws Exception {
//        1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        ////        1.1、开启checkpoint
//        env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
//        //设置checkpoint的超时时间,如果 Checkpoint在 10分钟内尚未完成说明该次Checkpoint失败,则丢弃。(默认10分钟)
//        env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
//        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(120000L);
//        //固定延迟重启   （最多重启次数，每次重启的时间间隔）
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
////        1.2、设置状态后端
//        env.setStateBackend(new HashMapStateBackend());
//        System.setProperty("HADOOP_USER_NAME", "kevin");
//        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop3cluster/211126/ck");

//        2、读取kafka 页面日志主题创建流
        String topic = "dwd_traffic_page_log";
        String groupId = "user_jump_detail";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));

//        3、将每行数据转换为json对象
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.flatMap(new RichFlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                try {
                    if (value != null && !"".equals(value)) {
                        JSONObject jsonObject = JSON.parseObject(value);
                        out.collect(jsonObject);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    System.out.println(value);
                }
            }
        });

//        4、提取事件时间&按照mid分组
        KeyedStream<JSONObject, String> keyedStream = jsonObjDS
//                watermark=eventTime-延迟时间，watermark表示当前watermark之前的数据已经全部到来
//                ，可以触发watermark之前的数据的窗口计算
                .assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                            @Override
                            public long extractTimestamp(JSONObject element, long recordTimestamp) {
                                return element.getLong("ts");
                            }
                        }))
                .keyBy(json -> json.getJSONObject("common").getString("mid"));

//        5、定义cep的模式序列
        Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("start")
                .where(new SimpleCondition<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject value) throws Exception {
                        return value.getJSONObject("page").getString("last_page_id") == null;
                    }
                })
                .next("next")
                .where(new SimpleCondition<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject value) throws Exception {
                        return value.getJSONObject("page").getString("last_page_id") == null;
                    }
                })
//                将两事件的发生时间限定在10s内
                .within(Time.seconds(10));

//        6、将模式序列作用到流上
        PatternStream<JSONObject> patternStream = CEP.pattern(keyedStream, pattern);
//        7、提取事件（匹配上的事件以及超时事件）
        OutputTag<String> timeoutTag = new OutputTag<String>("timeout") {};
        SingleOutputStreamOperator<String> selectDS = patternStream.select(timeoutTag
                , new PatternTimeoutFunction<JSONObject, String>() {
                    @Override
                    public String timeout(Map<String, List<JSONObject>> pattern, long timeoutTimestamp) throws Exception {
                        return pattern.get("start").get(0).toJSONString();
                    }
                }
                , new PatternSelectFunction<JSONObject, String>() {
                    @Override
                    public String select(Map<String, List<JSONObject>> pattern) throws Exception {
                        return pattern.get("start").get(0).toJSONString();
                    }
                });

        DataStream<String> timeOutDS = selectDS.getSideOutput(timeoutTag);

        selectDS.print("Select>>>>>>");
        timeOutDS.print("TimeOut>>>");

//        8、合并两种事件
        DataStream<String> unionDS = selectDS.union(timeOutDS);

//        9、将数据写出到kafka
        String targetTopic = "dwd_traffic_user_jump_detail";
        unionDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(targetTopic));

//        10、启动任务
        env.execute("DwdTrafficeUserJumpDetail");
    }
}
